package cn.doitedu.etl_trash;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/9
 * @Desc: 商城用户行为事件日志数据，公共维度退维处理
 * 测试数据（通过kafka的producer，往kafka中灌入即可
 * {"username":"test","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 * {"username":"aewen","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 * {"username":"lion","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 * {"username":"lisi","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 * {"username":"wangwu","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 * {"username":"windy","eventId":"add_cart","eventTime":17823756346,"lng":140.897898,"lat":50.87235,"properties":{"pageId":"pg001","itemId":"item002"}}
 **/
public class EtlJob_MallUserEventCommonDim {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建kafka连接器数据源表： 用户行为事件
        tEnv.executeSql(
                " CREATE TABLE mall_events_kafkasource (               "
                        + "     username     string,                            "
                        + "     eventId      string,                            "
                        + "     eventTime    bigint,                            "
                        + "     lat          float,                             "
                        + "     lng          float,                             "
                        + "     properties   map<string,string>,                "
                        + "     proc_time   AS PROCTIME()                       "
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall_events_kafkasource',               "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'testGroup',               "
                        + "  'scan.startup.mode' = 'earliest-offset',           "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY'              "
                        + " )                                                   ");

        //tEnv.executeSql("select * from mall_events_kafkasource").print();
        /*
        +----+-----------+----------+-------------+---------+---------+--------------------------------+-------------------------+
        | op |  username |  eventId |   eventTime |     lat |     lng |                     properties |               proc_time |
        +----+-----------+----------+-------------+---------+---------+--------------------------------+-------------------------+
        | +I |     windy | add_cart | 17823756346 |50.87235 |140.8979 | {pageId=pg001, itemId=item002} | 2022-12-09 17:03:03.752 |
         */


        // 创建hbase连接器表： 用户注册信息维表
        tEnv.executeSql(
                "CREATE TABLE ums_member_hbasesource ( " +
                " username STRING,                              " +
                " f ROW<q STRING>,                              " +
                " PRIMARY KEY (username) NOT ENFORCED           " +
                ") WITH (                                       " +
                " 'connector' = 'hbase-2.2',                    " +
                " 'table-name' = 'dim_user_info',               " +
                " 'zookeeper.quorum' = 'doitedu:2181'           " +
                ")");
        //tEnv.executeSql("select username,f.q from ums_member_hbasesource").print();
        /*
        +----+--------------------------------+--------------------------------+
          op |                       username |                              q |
         ----+--------------------------------+--------------------------------+
          +I |                            aaa | {"create_time":"2022-12-09 ... |
          +I |                          aewen | {"create_time":"2018-11-12 ... |
          +I |                           lion | {"create_time":"2018-11-12 ... |
          +I |                           lisi | {"create_time":"2018-11-12 ... |
          +I |                          shari | {"create_time":"2018-11-12 ... |
         */

        tEnv.executeSql(
                "SELECT e.username,e.eventId,e.eventTime, " +
                "u.f.q   " +
                "FROM mall_events_kafkasource AS e  " +
                "JOIN ums_member_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS u\n" +
                "ON e.username = u.username ").print();
        /*
        +----+--------------------------------+--------------------------------+----------------------+--------------------------------+
        | op |                       username |                        eventId |            eventTime |                              q |
        +----+--------------------------------+--------------------------------+----------------------+--------------------------------+
        | +I |                          windy |                       add_cart |          17823756346 | {"create_time":"2018-08-03 ... |
        | +I |                         wangwu |                       add_cart |          17823756346 | {"create_time":"2018-11-12 ... |
         */


        // 创建hbase连接器表： geohash码地域维表



        // 创建hbase连接器表： 页面信息维表





    }
}
